Package org.apache.storm.bolt
Class JoinBolt
- java.lang.Object
-
- org.apache.storm.topology.base.BaseWindowedBolt
-
- org.apache.storm.bolt.JoinBolt
-
- All Implemented Interfaces:
Serializable,IComponent,IWindowedBolt
public class JoinBolt extends BaseWindowedBolt
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static classJoinBolt.FieldSelectorprotected classJoinBolt.JoinAccumulatorprotected static classJoinBolt.JoinInfoDescribes how to join the other stream with the current stream.protected static classJoinBolt.JoinTypeprotected classJoinBolt.ResultRecordstatic classJoinBolt.Selector-
Nested classes/interfaces inherited from class org.apache.storm.topology.base.BaseWindowedBolt
BaseWindowedBolt.Count, BaseWindowedBolt.Duration
-
-
Field Summary
Fields Modifier and Type Field Description protected LinkedHashMap<String,JoinBolt.JoinInfo>joinCriteriaprotected JoinBolt.FieldSelector[]outputFieldsprotected StringoutputStreamNameprotected JoinBolt.SelectorselectorType-
Fields inherited from class org.apache.storm.topology.base.BaseWindowedBolt
timestampExtractor, windowConfiguration
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voiddeclareOutputFields(OutputFieldsDeclarer declarer)Declare the output schema for all the streams of this topology.protected JoinBolt.JoinAccumulatordoInnerJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)protected JoinBolt.JoinAccumulatordoJoin(JoinBolt.JoinAccumulator probe, HashMap<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)protected JoinBolt.JoinAccumulatordoLeftJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)protected ArrayList<Object>doProjection(ArrayList<Tuple> tuples, JoinBolt.FieldSelector[] projectionFields)voidexecute(TupleWindow inputWindow)Process the tuple window and optionally emit new tuples based on the tuples in the input window.protected JoinBolt.JoinAccumulatorhashJoin(List<Tuple> tuples)JoinBoltjoin(String newStream, String field, String priorStream)Performs inner Join with the newStream.JoinBoltleftJoin(String newStream, String field, String priorStream)Performs left Join with the newStream.protected ObjectlookupField(JoinBolt.FieldSelector fieldSelector, Tuple tuple)voidprepare(Map<String,Object> topoConf, TopologyContext context, OutputCollector collector)This is similar to theIBolt.prepare(Map, TopologyContext, OutputCollector)except that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.JoinBoltselect(String commaSeparatedKeys)Specify projection fields.BaseWindowedBoltwithLag(BaseWindowedBolt.Duration duration)Specify the maximum time lag of the tuple timestamp in milliseconds.JoinBoltwithLateTupleStream(String streamId)Specify a stream id on which late tuples are going to be emitted.JoinBoltwithOutputStream(String streamName)Optional.JoinBoltwithTimestampExtractor(TimestampExtractor timestampExtractor)Specify the timestamp extractor implementation.JoinBoltwithTimestampField(String fieldName)Specify a field in the tuple that represents the timestamp as a long value.JoinBoltwithTumblingWindow(BaseWindowedBolt.Count count)A count based tumbling window.JoinBoltwithTumblingWindow(BaseWindowedBolt.Duration duration)A time duration based tumbling window.BaseWindowedBoltwithWatermarkInterval(BaseWindowedBolt.Duration interval)Specify the watermark event generation interval.JoinBoltwithWindow(BaseWindowedBolt.Count windowLength)A tuple count based window that slides with every incoming tuple.JoinBoltwithWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Count slidingInterval)Tuple count based sliding window configuration.JoinBoltwithWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Duration slidingInterval)Tuple count and time duration based sliding window configuration.JoinBoltwithWindow(BaseWindowedBolt.Duration windowLength)A time duration based window that slides with every incoming tuple.JoinBoltwithWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Count slidingInterval)Time duration and count based sliding window configuration.JoinBoltwithWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Duration slidingInterval)Time duration based sliding window configuration.-
Methods inherited from class org.apache.storm.topology.base.BaseWindowedBolt
cleanup, getComponentConfiguration, getTimestampExtractor
-
-
-
-
Field Detail
-
selectorType
protected final JoinBolt.Selector selectorType
-
joinCriteria
protected LinkedHashMap<String,JoinBolt.JoinInfo> joinCriteria
-
outputFields
protected JoinBolt.FieldSelector[] outputFields
-
outputStreamName
protected String outputStreamName
-
-
Constructor Detail
-
JoinBolt
public JoinBolt(String sourceId, String fieldName)
Calls JoinBolt(Selector.SOURCE, sourceId, fieldName)- Parameters:
sourceId- Id of source component (spout/bolt) from which this bolt is receiving datafieldName- the field to use for joining the stream (x.y.z format)
-
JoinBolt
public JoinBolt(JoinBolt.Selector type, String srcOrStreamId, String fieldName)
Introduces the first stream to start the join with. Equivalent SQL ... select .... from srcOrStreamId ...- Parameters:
type- Specifies whether 'srcOrStreamId' refers to stream name/source componentsrcOrStreamId- name of stream OR source componentfieldName- the field to use for joining the stream (x.y.z format)
-
-
Method Detail
-
withOutputStream
public JoinBolt withOutputStream(String streamName)
Optional. Allows naming the output stream of this bolt. If not specified, the emits will happen on 'default' stream.
-
join
public JoinBolt join(String newStream, String field, String priorStream)
Performs inner Join with the newStream. SQL:from priorStream inner join newStream on newStream.field = priorStream.field1same as:new WindowedQueryBolt(priorStream,field1). join(newStream, field, priorStream);Note: priorStream must be previously joined. Valid ex:new WindowedQueryBolt(s1,k1). join(s2,k2, s1). join(s3,k3, s2);Invalid ex:new WindowedQueryBolt(s1,k1). join(s3,k3, s2). join(s2,k2, s1);- Parameters:
newStream- Either stream name or name of upstream componentfield- the field on which to perform the join
-
leftJoin
public JoinBolt leftJoin(String newStream, String field, String priorStream)
Performs left Join with the newStream. SQL : from stream1 left join stream2 on stream2.field = stream1.field1 same as: new WindowedQueryBolt(stream1, field1). leftJoin(stream2, field, stream1);Note: priorStream must be previously joined Valid ex: new WindowedQueryBolt(s1,k1). leftJoin(s2,k2, s1). leftJoin(s3,k3, s2); Invalid ex: new WindowedQueryBolt(s1,k1). leftJoin(s3,k3, s2). leftJoin(s2,k2, s1);
- Parameters:
newStream- Either a name of a stream or an upstream componentfield- the field on which to perform the join
-
select
public JoinBolt select(String commaSeparatedKeys)
Specify projection fields. i.e. Specifies the fields to include in the output. e.g: .select("field1, stream2:field2, field3") Nested Key names are supported for nested types: e.g: .select("outerKey1.innerKey1, outerKey1.innerKey2, stream3:outerKey2.innerKey3)" Inner types (non leaf) must be Map<> in order to support nested lookup using this dot notation This selected fields implicitly declare the output fieldNames for the bolt based.
-
declareOutputFields
public void declareOutputFields(OutputFieldsDeclarer declarer)
Description copied from interface:IComponentDeclare the output schema for all the streams of this topology.- Specified by:
declareOutputFieldsin interfaceIComponent- Overrides:
declareOutputFieldsin classBaseWindowedBolt- Parameters:
declarer- this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
-
prepare
public void prepare(Map<String,Object> topoConf, TopologyContext context, OutputCollector collector)
Description copied from interface:IWindowedBoltThis is similar to theIBolt.prepare(Map, TopologyContext, OutputCollector)except that while emitting, the tuples are automatically anchored to the tuples in the inputWindow.- Specified by:
preparein interfaceIWindowedBolt- Overrides:
preparein classBaseWindowedBolt
-
execute
public void execute(TupleWindow inputWindow)
Description copied from interface:IWindowedBoltProcess the tuple window and optionally emit new tuples based on the tuples in the input window.
-
hashJoin
protected JoinBolt.JoinAccumulator hashJoin(List<Tuple> tuples)
-
doJoin
protected JoinBolt.JoinAccumulator doJoin(JoinBolt.JoinAccumulator probe, HashMap<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doInnerJoin
protected JoinBolt.JoinAccumulator doInnerJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doLeftJoin
protected JoinBolt.JoinAccumulator doLeftJoin(JoinBolt.JoinAccumulator probe, Map<Object,ArrayList<Tuple>> buildInput, JoinBolt.JoinInfo joinInfo, boolean finalJoin)
-
doProjection
protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, JoinBolt.FieldSelector[] projectionFields)
-
lookupField
protected Object lookupField(JoinBolt.FieldSelector fieldSelector, Tuple tuple)
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Count slidingInterval)
Description copied from class:BaseWindowedBoltTuple count based sliding window configuration.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the number of tuples in the windowslidingInterval- the number of tuples after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Duration slidingInterval)
Description copied from class:BaseWindowedBoltTuple count and time duration based sliding window configuration.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the number of tuples in the windowslidingInterval- the time duration after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Count slidingInterval)
Description copied from class:BaseWindowedBoltTime duration and count based sliding window configuration.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the time duration of the windowslidingInterval- the number of tuples after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Duration slidingInterval)
Description copied from class:BaseWindowedBoltTime duration based sliding window configuration.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the time duration of the windowslidingInterval- the time duration after which the window slides
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Count windowLength)
Description copied from class:BaseWindowedBoltA tuple count based window that slides with every incoming tuple.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the number of tuples in the window
-
withWindow
public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength)
Description copied from class:BaseWindowedBoltA time duration based window that slides with every incoming tuple.- Overrides:
withWindowin classBaseWindowedBolt- Parameters:
windowLength- the time duration of the window
-
withTumblingWindow
public JoinBolt withTumblingWindow(BaseWindowedBolt.Count count)
Description copied from class:BaseWindowedBoltA count based tumbling window.- Overrides:
withTumblingWindowin classBaseWindowedBolt- Parameters:
count- the number of tuples after which the window tumbles
-
withTumblingWindow
public JoinBolt withTumblingWindow(BaseWindowedBolt.Duration duration)
Description copied from class:BaseWindowedBoltA time duration based tumbling window.- Overrides:
withTumblingWindowin classBaseWindowedBolt- Parameters:
duration- the time duration after which the window tumbles
-
withTimestampField
public JoinBolt withTimestampField(String fieldName)
Description copied from class:BaseWindowedBoltSpecify a field in the tuple that represents the timestamp as a long value. If this field is not present in the incoming tuple, anIllegalArgumentExceptionwill be thrown. The field MUST contain a timestamp in milliseconds- Overrides:
withTimestampFieldin classBaseWindowedBolt- Parameters:
fieldName- the name of the field that contains the timestamp
-
withTimestampExtractor
public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
Description copied from class:BaseWindowedBoltSpecify the timestamp extractor implementation.- Overrides:
withTimestampExtractorin classBaseWindowedBolt- Parameters:
timestampExtractor- theTimestampExtractorimplementation
-
withLateTupleStream
public JoinBolt withLateTupleStream(String streamId)
Description copied from class:BaseWindowedBoltSpecify a stream id on which late tuples are going to be emitted. They are going to be accessible via theWindowedBoltExecutor.LATE_TUPLE_FIELDfield. It must be defined on a per-component basis, and in conjunction with theBaseWindowedBolt.withTimestampField(java.lang.String), otherwiseIllegalArgumentExceptionwill be thrown.- Overrides:
withLateTupleStreamin classBaseWindowedBolt- Parameters:
streamId- the name of the stream used to emit late tuples on
-
withLag
public BaseWindowedBolt withLag(BaseWindowedBolt.Duration duration)
Description copied from class:BaseWindowedBoltSpecify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps cannot be out of order by more than this amount.- Overrides:
withLagin classBaseWindowedBolt- Parameters:
duration- the max lag duration
-
withWatermarkInterval
public BaseWindowedBolt withWatermarkInterval(BaseWindowedBolt.Duration interval)
Description copied from class:BaseWindowedBoltSpecify the watermark event generation interval. For tuple based timestamps, watermark events are used to track the progress of time- Overrides:
withWatermarkIntervalin classBaseWindowedBolt- Parameters:
interval- the interval at which watermark events are generated
-
-